package com.sikong.calcite.csv;

import com.sikong.constant.StringSpliter;
import lombok.extern.slf4j.*;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Source;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * CsvTable Demo
 *
 * @author wangbohong
 * @date 2022/03/09 16:26:22
 * @since 0.1.0
 */
@Slf4j
public class CsvTableDemo extends AbstractTable implements ScannableTable {

    private FileReader fileReader;

    public CsvTableDemo(FileReader fileReader) {
        this.fileReader = fileReader;
    }

    @Override
    public Enumerable<Object[]> scan(DataContext dataContext) {
        return new AbstractEnumerable<Object[]>() {
            @Override
            public Enumerator<Object[]> enumerator() {
                return new CsvEnumeratorDemo<>(fileReader);
            }
        };
    }

    /**
     * @param relDataTypeFactory
     * @return 数据类型
     * @author:wangbohong
     * @since 0.1.0
     */
    @Override
    public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
        JavaTypeFactory typeFactory = (JavaTypeFactory) relDataTypeFactory;
        List<String> names = new ArrayList<>();
        List<RelDataType> types = new ArrayList<>();
        try (BufferedReader reader = new BufferedReader(fileReader)) {
            String line = reader.readLine();
            // csv数据示例：ID:VARCHAR,NAME1:VARCHAR,NAME2:VARCHAR
            String[] lines = line.split(StringSpliter.COMMA);
            for (String s : lines) {
                String[] row = s.split(StringSpliter.COLON);
                names.add(row[0]);
                types.add(typeFactory.createSqlType(SqlTypeName.get(row[1])));
            }
        } catch (FileNotFoundException e) {
            log.error("文件找不到", e);
        } catch (IOException e) {
            log.error("IO异常", e);
        }
        return typeFactory.createStructType(Pair.zip(names, types));
    }
}
